package producer

import (
	"github.com/Shopify/sarama"
	"time"
)



// sync producer

type FinderKafkaSyncProducer struct {
	Producer sarama.SyncProducer
	brokers []string

}

func NewFinderKafkaSyncProducer(brokers []string) (*FinderKafkaSyncProducer,error) {
	kafka:=&FinderKafkaSyncProducer{brokers:brokers}


	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForAll
	//随机向partition发送消息
	config.Producer.Partitioner = sarama.NewRandomPartitioner

	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	config.Producer.Timeout = 5 * time.Second

	//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
	//注意，版本设置不对的话，kafka会返回很奇怪的错误，并且无法成功发送消息
	config.Version = sarama.V0_10_0_1


	p, err := sarama.NewSyncProducer(brokers, config)
	kafka.Producer=p

	if err!=nil{
		return nil ,err
	}else{
		return kafka,nil
	}
}

// 发送同步消息
func (this *FinderKafkaSyncProducer) OnProduce(topic string,data interface{}) (partition int32, offset int64, err error) {


	// 发送的消息,主题。

	msg:=&sarama.ProducerMessage{}
	msg.Topic=topic

	switch data.(type) {
	case string:
		msg.Value=sarama.ByteEncoder(data.(string))
	case []byte:
		msg.Value=sarama.ByteEncoder(data.([]byte))
	default:

	}

	partition,offset,err= this.Producer.SendMessage(msg)
	return partition,offset,err
}

